Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new functionalities to reset offsets #194

Merged
merged 5 commits into from
Jun 4, 2024
Merged

Conversation

JulienPeloton
Copy link
Member

IMPORTANT: Please create an issue first before opening a Pull Request.
Linked to issue(s): #146

What changes were proposed in this pull request?

Checking offsets

You can now check where you are on the different queues, that is retrieving the offsets for each topic that you are polling:

fink_consumer --display_statistics

Topic [Partition]                                   Committed        Lag
========================================================================
fink_sso_ztf_candidates_ztf  [4]                            1        972
------------------------------------------------------------------------
Total for fink_sso_ztf_candidates_ztf                       1        972
------------------------------------------------------------------------

Topic [Partition]                                   Committed        Lag
========================================================================
------------------------------------------------------------------------
Total for fink_sso_fink_candidates_ztf                      0          2
------------------------------------------------------------------------

In this example, I have two topic, fink_sso_ztf_candidates_ztf and fink_sso_fink_candidates_ztf.

For the first topic, there is one active partition on the remote Kafka cluster that served data (number [4]). I polled 1 alert (Committed), and there are 972 remaining alerts to be polled (Lag). As there is only one active partition on the remote Kafka cluster, the total is the same (there could be up to 10 active partitions). For the second topic, I did not start polling as 0 alert has been Committed.

Resetting offsets

Sometimes you might want to poll again alerts, that is restarting to poll from the beginning of a queue. For this, you can use:

fink_consumer --display -start_at earliest
Resetting offsets to BEGINNING
...
assign TopicPartition{topic=fink_sso_fink_candidates_ztf,partition=0,offset=0,leader_epoch=None,error=None}
...
assign TopicPartition{topic=fink_sso_ztf_candidates_ztf,partition=0,offset=0,leader_epoch=None,error=None}
...
# poll restarts at the first offset

All your topic partitions will be reset to the starting offset (0 in this case). Similarly, you can empty all topics, and restarting polling from the last offset:

fink_consumer --display -start_at latest
...
assign TopicPartition{topic=fink_sso_fink_candidates_ztf,partition=0,offset=0,leader_epoch=None,error=None}
...
assign TopicPartition{topic=fink_sso_fink_candidates_ztf,partition=4,offset=2,leader_epoch=None,error=None}
...
assign TopicPartition{topic=fink_sso_ztf_candidates_ztf,partition=4,offset=973,leader_epoch=None,error=None}
...
No alerts the last 10 seconds
...

Empty partitions will have offset=0, but others will have their offset to the latest one. The client will then wait for new data to come. Note that the reset will be actually triggered on the next poll. Hence the command fink_consumer --display_statistics will not right away display the reset offsets.
This is particularly useful after a bug in the topic (malformed alerts pushed), and you want a fresh restart.

How was this patch tested?

Manual

Copy link

sonarcloud bot commented Jun 4, 2024

Quality Gate Passed Quality Gate passed

Issues
7 New issues
0 Accepted issues

Measures
0 Security Hotspots
No data about Coverage
0.0% Duplication on New Code

See analysis details on SonarCloud

@JulienPeloton JulienPeloton merged commit 2a9a2e2 into master Jun 4, 2024
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant